Skip to content

Migrate BibliothecaPurchaseMonitor to Celery tasks#3385

Open
dbernstein wants to merge 17 commits into
mainfrom
feature/bibliotheca-purchase-monitor-celery
Open

Migrate BibliothecaPurchaseMonitor to Celery tasks#3385
dbernstein wants to merge 17 commits into
mainfrom
feature/bibliotheca-purchase-monitor-celery

Conversation

@dbernstein
Copy link
Copy Markdown
Contributor

@dbernstein dbernstein commented May 22, 2026

Description

Replaces the legacy script-driven BibliothecaPurchaseMonitor (and its helper BibliothecaTimelineMonitor) with a pair of Celery tasks that page through MARC purchase records one day at a time, chaining via task.replace() until the collection is fully caught up to utc_now().

This is PR 2 of 3 in the Bibliotheca → Celery migration series (PR 1 migrated the event monitor; PR 3 will migrate the circulation sweep).

FYI: I create a follow-on ticket to remove dead monitor infrastructure code.


Task architecture

import_purchase_records_for_all_collections (fan-out)

  • Scheduled via Celery beat daily at 4:00 AM.
  • Queries all Bibliotheca collections from the database and enqueues one import_purchase_records_by_collection task per collection.
  • Accepts an optional force_reimport=True flag (see Manual scripts below).

import_purchase_records_by_collection (per-collection worker)

  • Processes a single page of up to 50 MARC records for a one-day window [current_day, day_end], where day_end = min(current_day + 1 day, utc_now()).
  • After each page it re-queues itself via task.replace():
    • Full page (50 records): more records remain for the same calendar day → re-queues with offset advanced by 50, current_day unchanged.
    • Partial page (<50 records): the day is fully processed → re-queues with current_day advanced to day_end and offset reset to 1.
  • The loop terminates naturally when current_day >= utc_now() (collection is up to date).

First run vs. subsequent runs

First run Subsequent run
Start derived from DEFAULT_PURCHASE_RECORD_START_TIME (2014-01-01) when no Timestamp exists; otherwise Timestamp.finish Timestamp.finish (resume from last completed day)
Scope Full historical backfill from 2014-01-01 to now Only the delta since last run

Mid-chain invocations receive current_day as an explicit parameter, bypassing get_start() entirely so the stored Timestamp is never consulted again until the chain finishes.


Timestamp checkpointing

Timestamp.finish is updated after every page — not just at the end of the chain — to guard against crashes mid-backfill:

  • Day still in progress (page was full): finish = current_day. A restart will re-process the current day from offset 1 rather than falling back to a previous day.
  • Day fully processed (partial page): finish = day_end. The next beat trigger picks up from the next day.

Failure modes

Failure Behaviour
BadResponseException / RequestTimedOut Auto-retried up to 4 times with 60 s exponential back-off. The Redis workflow lock is held across retries (via ignored_exceptions) so a transient API error does not open a window for a second concurrent run.
RemoteIntegrationException Declared in throws= — treated as a terminal, non-retriable error. The exception is logged and discarded; Celery does not mark the task as failed. This approach mirrors other celery importer routines
Redis lock already held on first invocation The task logs a warning and returns immediately, leaving the in-progress chain undisturbed.
Redis lock expires mid-chain (rare) The task logs a warning ("workflow lock expired between invocations") and continues processing; a concurrent run may briefly exist for the same collection.

Redis workflow lock

  • Key: ["PurchaseRecordCollectionWorkflow", <collection-redis-key>]
  • Lock timeout: 2 hours (generous ceiling for a large historical backfill page).
  • A UUID lock_value is generated on the first invocation and forwarded unchanged to every subsequent re-queued task. Because task.replace() raises celery.exceptions.Ignore, Ignore is listed in ignored_exceptions so the lock is not released when the task replaces itself.
  • The lock key is distinct from the event-import lock so both workflows can run concurrently for the same collection without blocking each other.

MARC record processing

Each MARC record goes through BibliothecaPurchaseRecordImporter._process_record():

  1. Extracts the Bibliotheca ID from MARC field 001 (logs an error and skips if missing or ambiguous — i.e. zero or more than one 001 field).
  2. Creates or finds the LicensePool via LicensePool.for_foreign_id().
  3. Calls api.bibliographic_lookup() and, for each result, checks bibliographic.needs_apply() (hash-based deduplication). Only changed metadata queues a bibliographic_apply task on the apply queue; unchanged records produce no database writes.

Manual scripts

bin/bibliotheca_purchase_record_import (CLI entry point) dispatches ImportPurchaseRecordCollection:

# Import a single named collection
bin/bibliotheca_purchase_record_import --collection "My Bibliotheca"

# Queue all collections
bin/bibliotheca_purchase_record_import --import-all

# Force a full reimport from 2014-01-01, ignoring the stored Timestamp
bin/bibliotheca_purchase_record_import --collection "My Bibliotheca" --force-reimport
bin/bibliotheca_purchase_record_import --import-all --force-reimport

--force-reimport passes current_day=DEFAULT_PURCHASE_RECORD_START_TIME directly to import_purchase_records_by_collection.delay() (or, for --import-all, sets force_reimport=True on import_purchase_records_for_all_collections.delay() which fans it out to each collection). This completely bypasses the stored Timestamp, restarting the full backfill from 2014-01-01.


Motivation and Context

JIRA: (PP-4391)

The three Bibliotheca monitors (event, purchase, circulation sweep) are driven by external cron jobs invoking one-shot scripts backed by a legacy Monitor base-class hierarchy. Migrating to Celery gives us:

  • Centralised scheduling via beat (no external cron dependency)
  • task.replace() to avoid tying up a worker thread for the full duration of a large historical backfill
  • Standard retry/back-off handling via autoretry_for instead of bespoke try/except loops
  • Deletion of several hundred lines of monitor scaffolding

How Has This Been Tested?

~60 tests across five test files — all passing:

  • test_bibliotheca.pyTestBibliothecaImportPurchaseRecordsForAllCollections and TestBibliothecaImportPurchaseRecordsByCollection: first-run default start, stored-timestamp resume, already-up-to-date early exit, full-page replace with advanced offset, partial-page replace with advanced day, force_reimport flag propagation, lock-held skip/warning, lock-expired mid-chain warning, lock forwarded across replaces, lock not released on autoretry, lock independence from event-import lock.
  • test_bibliotheca_purchase_record_importer.pyget_start() with and without prior timestamp; import_day() window bounds, cutoff capping, timestamp stamping (mid-day vs. day-complete), record count, offset forwarding, next-offset logic; _process_record() happy path, missing control number, multiple control numbers, bibliographic_apply queued/skipped based on needs_apply().
  • test_bibliotheca_scripts.py — both ImportEventCollection and ImportPurchaseRecordCollection: --import-all, --collection, --force-reimport, unknown collection error, missing args error, conflicting args error.

Checklist

  • I have updated the documentation accordingly.
  • All new and existing tests passed.

@dbernstein dbernstein added the feature New feature label May 22, 2026
@claude
Copy link
Copy Markdown
Contributor

claude Bot commented May 22, 2026

Claude finished @dbernstein's task in 7m 29s —— View job


Summary

Well-structured migration with clean task architecture, a thorough test suite (~60 tests), and consistent use of needs_apply() for deduplication across both importers. No correctness bugs found. The two findings below are a minor behaviour change worth noting and a nit on an overcounted log metric.


Details

Minor: src/palace/manager/integration/license/bibliotheca_purchase_record_importer.py:197-203

The new _process_record calls self._api.bibliographic_lookup() — an HTTP request — for every MARC record regardless of whether the LicensePool is new or pre-existing. The legacy BibliothecaPurchaseMonitor.process_record only called ensure_coverage when is_new=True, so incremental runs previously made zero extra API calls for titles already in the collection. Under the new implementation, a --force-reimport from 2014-01-01 will issue one HTTP call per MARC record across all historical pages; for a large collection that can be tens of thousands of calls. needs_apply() prevents redundant DB writes and queued tasks, but the round-trips to Bibliotheca still happen. This is consistent with how BibliothecaEventImporter._handle_event is implemented, so it is at least a coherent pattern — just worth acknowledging since force-reimport is the most expensive path and the API volume is much higher than the legacy behaviour.

for bibliographic in self._api.bibliographic_lookup(bibliotheca_id):
if bibliographic.needs_apply(self._session):
apply.bibliographic_apply.delay(
bibliographic,
collection_id=self._collection.id,
replace=ReplacementPolicy.from_license_source(),
)


Nit: src/palace/manager/celery/tasks/bibliotheca.py:305-309

The log line says "fetched {result.records_fetched} record(s)" but records_fetched counts every record the API returned including ones that were skipped (no control number, multiple control numbers). A record skipped in _process_record is still counted, so the metric overstates how many records were actually processed. The field name records_fetched is accurate at the importer level but the log phrasing "fetched … record(s)" in the task layer reads as "processed", which may mislead operators monitoring logs during a backfill. Renaming the log token to "received" or "fetched from API" (vs. "processed") would remove the ambiguity.

f"Bibliotheca purchase record import: fetched {result.records_fetched} record(s) for "
f"'{collection_name}' "
f"({result.day_start.strftime('%Y-%m-%dT%H:%M:%S')} -> "
f"{result.day_end.strftime('%Y-%m-%dT%H:%M:%S')}, offset {offset})."
)


@codecov
Copy link
Copy Markdown

codecov Bot commented May 22, 2026

Codecov Report

❌ Patch coverage is 99.25926% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 93.39%. Comparing base (d8611d4) to head (d0717dc).

Files with missing lines Patch % Lines
src/palace/manager/celery/tasks/bibliotheca.py 97.95% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3385      +/-   ##
==========================================
+ Coverage   93.37%   93.39%   +0.01%     
==========================================
  Files         503      504       +1     
  Lines       46126    46158      +32     
  Branches     6323     6325       +2     
==========================================
+ Hits        43070    43107      +37     
+ Misses       1980     1973       -7     
- Partials     1076     1078       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread src/palace/manager/celery/tasks/bibliotheca.py Outdated
Comment thread src/palace/manager/celery/tasks/bibliotheca.py Outdated
Comment thread src/palace/manager/celery/tasks/bibliotheca.py Outdated
Comment thread src/palace/manager/celery/tasks/bibliotheca.py Outdated
Comment thread src/palace/manager/celery/tasks/bibliotheca.py Outdated
@dbernstein dbernstein force-pushed the feature/bibliotheca-purchase-monitor-celery branch from f56fc86 to 6a34bf8 Compare May 22, 2026 21:24
Comment thread src/palace/manager/celery/tasks/bibliotheca.py Outdated
Comment thread src/palace/manager/integration/license/bibliotheca_purchase_record_importer.py Outdated
@dbernstein dbernstein force-pushed the feature/bibliotheca-purchase-monitor-celery branch from 6803451 to 0755fca Compare May 27, 2026 17:55
Comment thread src/palace/manager/integration/license/bibliotheca_purchase_record_importer.py Outdated
Comment thread src/palace/manager/integration/license/bibliotheca_purchase_record_importer.py Outdated
@dbernstein dbernstein force-pushed the feature/bibliotheca-purchase-monitor-celery branch 2 times, most recently from 947d402 to 491e857 Compare May 27, 2026 20:55
@dbernstein dbernstein requested a review from a team May 28, 2026 18:25
dbernstein and others added 13 commits May 29, 2026 14:37
Replace the legacy script-driven BibliothecaPurchaseMonitor with a
Celery-native purchase_collection task that processes one day of MARC
records per invocation and chains days via task.replace() until the
collection is caught up.

- Add BibliothecaPurchaseImporter (bibliotheca_purchase_importer.py)
  with get_start/import_day/_purchases/_process_record; uses hash-based
  bibliographic_apply.delay() instead of BibliothecaBibliographicCoverageProvider
- Add purchase_all_collections and purchase_collection Celery tasks with
  Redis workflow lock (_purchase_workflow_lock, independent key from
  import_workflow_lock), autoretry for BadResponseException/RequestTimedOut,
  and replace-per-day chaining
- Add daily beat schedule entry (4:00 AM) in celery.py
- Add ImportPurchaseCollection script + bin/bibliotheca_purchase_import
  for manual trigger (mirrors ImportEventCollection from PR 1)
- Delete BibliothecaTimelineMonitor, BibliothecaPurchaseMonitor,
  RunBibliothecaPurchaseMonitorScript from bibliotheca.py and remove
  now-dead imports
- Delete bin/bibliotheca_purchase_monitor (replaced by Celery beat)
- Add default_language_version: python3.12 to .pre-commit-config.yaml
  so check-ast can parse Python 3.12 generic class syntax already present
  in bibliotheca.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace %-style log arguments and implicit f-string + bare-string
concatenations with f-strings throughout the new purchase monitor files.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Remove TestBibliothecaPurchaseMonitor and
  TestBibliothecaPurchaseMonitorWhenMultipleCollections from
  test_bibliotheca.py, along with the now-dead BibliothecaPurchaseMonitor
  and TimestampData imports
- Add return type Iterator[MagicMock] to mock_marc_request in
  test_bibliotheca_purchase_importer.py, removing the unnecessary
  type: ignore[no-untyped-def] comment

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously import_day fetched all pages for a day in a single loop,
allowing a single task to process an unbounded number of records if a
day had many purchases. This brings the purchase monitor in line with
every other paginated Celery task in the codebase.

Changes:
- import_day(current_day, cutoff, offset=1) now fetches exactly one
  API page (up to _MARC_PAGE_SIZE=50 records) and returns
  DayImportResult.next_offset: set to offset+50 when the page was full
  (more records remain), or None when the page was partial (day done)
- Remove _purchases() — pagination now lives at the task level via
  task.replace(), consistent with the event-import and Overdrive patterns
- Timestamp is checkpointed after every page: finish=current_day while
  the day is in progress (so a worker crash restarts from this day, not
  the previous one), finish=day_end when the day completes
- purchase_collection gains an offset: int = 1 parameter and handles two
  replace paths: same day + next offset, or next day + offset reset to 1
- Tests updated: replace test_paginates_marc_request with
  test_returns_next_offset_when_page_is_full,
  test_returns_no_next_offset_when_page_is_partial, and
  test_passes_offset_to_marc_request; add
  test_replaces_with_next_offset_when_page_full to task tests;
  tighten test_replaces_when_more_days_remain to assert offset=1

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
All identifiers introduced for the purchase-record sync now carry the
word "Record" to make the concept explicit: BibliothecaPurchaseRecordImporter,
PURCHASE_RECORD_SERVICE_NAME, import_purchase_records_for_all_collections,
import_purchase_records_by_collection, the Redis lock key, beat-schedule
entry, bin script, and all test fixtures/classes. Pure mechanical rename;
no logic changes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Cover the five cases that mirror TestImportEventCollection:
--import-all, --collection <name>, unknown collection,
no args, and both args simultaneously.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds a --force-reimport flag to ImportPurchaseRecordCollection and a
matching force_reimport parameter to import_purchase_records_for_all_collections.
When set, the import starts from DEFAULT_PURCHASE_RECORD_START_TIME (2014-01-01)
instead of resuming from the stored Timestamp.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
No functional change; remove the decorative `# ---` block separating
the event-import and purchase-record-import sections in both the task
module and its test file, as requested in the PR review.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The variable tracks whether lock_value is None (i.e. this is the first
invocation of the workflow chain), not whether we are processing the
first calendar day. Rename to is_first_invocation for clarity, and
update the matching warning log message and test assertion.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace the vague observation that "collections typically go back to
2014-01-01" with an explanation of where the value came from and why
we are preserving it, as requested in the PR review.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Cover the branch in BibliothecaPurchaseRecordImporter._process_record
that skips (and logs an error for) a MARC record that contains more
than one 001 field, as flagged in the PR review.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Both BibliothecaEventImporter and BibliothecaPurchaseRecordImporter are
Celery tasks, not legacy monitors, so their Timestamp records should
carry service_type=TASK_TYPE rather than MONITOR_TYPE. Update both
importers and their test fixtures accordingly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The previous commit updated the source importer but missed
test_bibliotheca_importer.py, which was still stamping and looking up
Timestamps with MONITOR_TYPE. The mismatch caused two CI failures;
this aligns the test fixtures with the updated importer.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
dbernstein and others added 4 commits May 29, 2026 14:37
- Move DEFAULT_PURCHASE_RECORD_START_TIME, _MARC_PAGE_SIZE, and
  DayImportResult out of inline test-body imports to the top of
  test_bibliotheca.py.

- test_replaces_when_more_days_remain: capture stored_finish and assert
  the replace current_day equals stored_finish + 1 day, rather than the
  previous weak 'is not None' check.

- test_replaces_with_next_offset_when_page_full: replace the
  BibliothecaAPI-level mock (which drove 50 records through
  _process_record's no-control-number error branch) with a direct mock
  of import_day returning a DayImportResult with next_offset set.

- Both test_lock_value_passed_through_on_replace tests: replace
  isinstance(lock_value, str) with UUID(lock_value) to confirm the
  generated value is a well-formed UUID.

- Add test_prior_timestamp_with_null_finish_returns_default_start:
  covers the explicit 'if timestamp.finish is None' branch in
  get_start() that was previously untested.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The field counted all records returned by the API page — including those
silently skipped by _process_record due to a missing or duplicate 001
control-number field. Calling it "handled" (and the achievement string
"MARC records processed") implied successful processing, which overstated
actual work done and obscured error-log churn from bad records.

Renamed to records_fetched and updated the achievement string and task
log message to "fetched" to accurately reflect that this is a raw API
page count, not a successfully-processed count.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Private helper functions do not need docstrings.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Keep the brief description but drop the verbose explanation of the
distinct-key rationale (which was the part flagged in review).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@dbernstein dbernstein force-pushed the feature/bibliotheca-purchase-monitor-celery branch from 6da8d36 to d0717dc Compare May 29, 2026 21:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant